package com.joysuccess.minaserver.minaserver;

import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.joysuccess.common.utils.HelpUtils;
import com.joysuccess.common.utils.SpringUtil;
import com.joysuccess.minaserver.constant.TopicName;
import com.joysuccess.minaserver.minaserver.raw.DecoderResultType;
import com.joysuccess.minaserver.rules.Alarm;
import com.joysuccess.minaserver.service.impl.MinaServiceImpl;
import lombok.extern.java.Log;
import org.apache.mina.core.service.IoHandlerAdapter;
import org.apache.mina.core.session.IdleStatus;
import org.apache.mina.core.session.IoSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.core.KafkaTemplate;

import java.util.HashMap;
import java.util.Map;
import java.util.Set;


/**
 * 专门处理消息的类,实现网络连接和消息处理的解耦
 */
@Log
public class ServerHandler extends IoHandlerAdapter {
    private final static Logger LOGGER = LoggerFactory.getLogger(ServerHandler.class);


    private KafkaTemplate kafkaTemplate= SpringUtil.getBean(KafkaTemplate.class);

    private MinaServiceImpl minaService = SpringUtil.getBean(MinaServiceImpl.class);;

    @Override
    public void sessionCreated(IoSession session) throws Exception {
        System.out.println("客户端开始连接:"+session);
    }

    @Override
    public void sessionOpened(IoSession session) throws Exception {
        System.out.println("客户端已连接:"+session);
    }
    @Override
    public void sessionClosed(IoSession session) throws Exception {
        System.out.println("客户端已断开:"+session);
    }
    /** 异常时候的处理 */
    @Override
    public void exceptionCaught(IoSession session, Throwable cause) throws Exception {

        log.info(session+"-minaserver error:"+cause.getMessage());
        cause.printStackTrace();
    }

    /** 收到消息 */
    @Override
    public void messageReceived(IoSession session, Object message) throws Exception {
        DecoderResultType decoderResultType = (DecoderResultType)message;
        log.info(decoderResultType.getInsType());
        String body = decoderResultType.getBody().substring(1, decoderResultType.getBody().length() - 1).replaceAll("\\\\", "");
        JSONArray pointJSONArray = JSONArray.parseArray(body);
        if (!pointJSONArray.isEmpty()){
            //告警
            if (JSONObject.parseObject(String.valueOf(pointJSONArray.get(0))).containsKey("content")){
                for (Object pointJSON: pointJSONArray) {
                    JSONObject point = JSONObject.parseObject(String.valueOf(pointJSON));
                    if (point.getInteger("status") == -1){
                        continue;
                    }
                    Alarm alarm = new Alarm();
                    alarm.setAlarmDescription(point.getString("content"));
                    alarm.setAlarmLevel(point.getInteger("level"));
                    alarm.setPointId(point.getString("pointCode"));
                    alarm.setAlarmsStatus(point.getInteger("status"));
                    alarm.setAlarmTime(point.getDate("date"));
                    log.info(alarm.alarmToJsonString(alarm));
                    kafkaTemplate.send(TopicName.ALL_MSG_COL_ALARM, alarm.alarmToJsonString(alarm));
                }
            }else{//实时数据
                Map<String, Map<String,String>> equipPointsList = minaService.getEquipPointsMap();
                log.info("获取测点数据：" + equipPointsList.size());
                if(HelpUtils.isNotEmpty(equipPointsList)){
                    //keySet是设备编号，值是设备对应的采集点信息
                    Set<String> assetListPoints = equipPointsList.keySet();
                    for (String assetId: assetListPoints) {
                        //获取采集点数据集合
                        Map<String,String> oidLists = equipPointsList.get(assetId);
                        Set<String> oidIds =  oidLists.keySet();
                        try {
                            //获取采集点的值，获取的结果是oid:varies信息
                            Map<String,String> resultMap = new HashMap<>();
                            for (Object pointJSON: pointJSONArray) {
                                JSONObject point = JSONObject.parseObject(String.valueOf(pointJSON));
                                resultMap.put(point.getString("pointCode"), point.getString("pointValue"));
                            }
                            //Map<String,String> resultMap = SnmpManager.snmpGetListBySet(snmpId,snmpPort,oidIds);
                            //将map转换为JSONString便于数据发送到Kafka中
                            String oidsList = JSONObject.toJSONString(resultMap);
                            JSONObject equipOids = new JSONObject();
                            //发送到kafka数据的格式是：设备：{oid:varies,oid:varies}，也就是当前设备下的采集点信息
                            equipOids.put("assetId",assetId);
                            equipOids.put("oidList",oidsList);
                            kafkaTemplate.send(TopicName.DLJK_DATA_TOPIC_0001, equipOids.toJSONString());
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                    }
                }else {
                    LOGGER.warn("SNMP 数据发送到 Kafka失败，因为还没有进行Redis的数据的初始化，继续循环，等待数据被初始化");
                }
            }
        }
    }

    /** 发送消息 */
    @Override
    public void messageSent(IoSession session, Object message) throws Exception {
        System.out.println("messageSent " + message);
    }

    /** 通讯闲置时候的处理 */
    @Override
    public void sessionIdle(IoSession session, IdleStatus status) throws Exception {
        System.out.println("sessionIdle:"+session);
    }
}
